package com.huan.flink.connect;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
 * 通过 connect 来进行多条流的合并
 * 1、通过connect来进行合流，一次只可合2个流
 * 2、通过connect来合流，流中的元素类型可以不一样
 * 3、通过connect来合流，虽然表面上来看是一个流，但是通过调用map、flatmap等方法的时候，其实是各自处理各自的流，即内部还是分开来处理的
 *
 * @author huan.fu
 * @date 2023/9/23 - 10:53
 */
public class ConnectStreamApplication {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建3条数据流
        // 本地通过 nc -lk 8888 启动一个socket， 然后输入数字
        SingleOutputStreamOperator<Integer> source1 = environment.socketTextStream("localhost", 8888).map(Integer::valueOf);
        // 本地通过 nc -lk 9999 启动一个socket，然后输入字符
        DataStreamSource<String> source2 = environment.socketTextStream("localhost", 9999);

        // 将2条流合起来
        source1.connect(source2)
                /**
                 * 第一个参数：source1流的类型
                 * 第二个参数：source2流的类型
                 * 第三个参数：map方法转换后，输出流的类型
                 */
                .map(new CoMapFunction<Integer, String, String>() {
                    @Override
                    public String map1(Integer value) throws Exception {
                        return "来源于数字流:" + value;
                    }

                    @Override
                    public String map2(String value) throws Exception {
                        return "来源于字符流:" + value;
                    }
                })
                .print();

        environment.execute("合流");
    }
}
